feat(plan): optimize REPLACE index rewrites and fix rowid if panic#24163
feat(plan): optimize REPLACE index rewrites and fix rowid if panic#24163ck89119 wants to merge 6 commits intomatrixorigin:mainfrom
Conversation
df93ee5 to
7989b4b
Compare
There was a problem hiding this comment.
Pull request overview
This PR improves the REPLACE INTO planning/execution path in MatrixOne by reducing unnecessary unique-index work, avoiding redundant index rewrites, and aligning affected_rows behavior with MySQL REPLACE semantics. It also includes a follow-up stability fix (rowid support in if/iff and additional nil defenses) found during BVT.
Changes:
- Add
UpdateCtx.is_replaceand propagate it through compile/remote-run intomulti_updateso REPLACE can count deleted rows towardaffected_rows. - Optimize REPLACE planning: consume
skipUniqueIdx, add conditional projection to skip redundant index delete+insert, and add bounded static scan-filter pushdown forREPLACE ... VALUES. - Fix planner/executor panics by adding
rowidsupport toif/iffand adding defensive nil checks in optimizer helpers.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| proto/plan.proto | Adds is_replace flag to UpdateCtx for REPLACE semantics. |
| pkg/pb/plan/plan.pb.go | Regenerated protobuf code for is_replace. |
| pkg/sql/plan/bind_replace.go | Implements REPLACE optimizations (skip unique dedup, conditional index rewrite, static filter pushdown). |
| pkg/sql/colexec/multi_update/types.go | Uses IsReplace to include delete rows in affected rows for REPLACE. |
| pkg/sql/compile/operator.go | Propagates IsReplace into VM multi_update context. |
| pkg/sql/compile/remoterun.go | Preserves IsReplace across pipeline serialization/deserialization. |
| pkg/sql/plan/deepcopy.go | Deep-copies IsReplace in UpdateCtx clones. |
| pkg/sql/plan/function/operatorSet.go | Adds rowid typing/execution support for if/iff. |
| pkg/sql/plan/function/operatorSet_test.go | Adds if(rowid, …) coverage. |
| pkg/sql/plan/opt_misc.go | Adds nil guards to avoid optimizer panic. |
| pkg/sql/plan/build_util.go | Adds nil-arg guard in boolean conversion helper. |
| pkg/sql/plan/build_util_test.go | Adds regression test for nil-arg guard. |
| pkg/sql/plan/build_test.go | Adds planner structure tests for new REPLACE optimizations. |
| pkg/sql/colexec/multi_update/affected_rows_test.go | Adds focused unit tests for affected-rows semantics incl. REPLACE. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| valueExpr, err = binder.BindExpr(astExpr, 0, true) | ||
| if err != nil { | ||
| return nil, nil | ||
| } | ||
| if isEnumPlanType(&colDef.Typ) { | ||
| valueExpr, err = funcCastForEnumType(builder.GetContext(), valueExpr, colDef.Typ) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } else if isSetPlanType(&colDef.Typ) { | ||
| valueExpr, err = funcCastForSetType(builder.GetContext(), valueExpr, colDef.Typ) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } else if isGeometryPlanType(&colDef.Typ) { | ||
| valueExpr, err = funcCastForGeometryType(builder.GetContext(), valueExpr, colDef.Typ) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| } |
There was a problem hiding this comment.
Static scan filter pushdown is built from valueExprs bound from VALUES. If any valueExpr contains volatile/non-deterministic functions (e.g. rand(), uuid(), now()) and is reused in TABLE_SCAN FilterList, it can miss conflicts and break REPLACE semantics. Consider restricting collected values to deterministic runtime-constants (literals/params) or running a volatility check (similar to checkExprForVolatileFunc) and disabling this optimization when such functions appear.
| valueExpr, err = binder.BindExpr(astExpr, 0, true) | |
| if err != nil { | |
| return nil, nil | |
| } | |
| if isEnumPlanType(&colDef.Typ) { | |
| valueExpr, err = funcCastForEnumType(builder.GetContext(), valueExpr, colDef.Typ) | |
| if err != nil { | |
| return nil, err | |
| } | |
| } else if isSetPlanType(&colDef.Typ) { | |
| valueExpr, err = funcCastForSetType(builder.GetContext(), valueExpr, colDef.Typ) | |
| if err != nil { | |
| return nil, err | |
| } | |
| } else if isGeometryPlanType(&colDef.Typ) { | |
| valueExpr, err = funcCastForGeometryType(builder.GetContext(), valueExpr, colDef.Typ) | |
| if err != nil { | |
| return nil, err | |
| } | |
| } | |
| // Static scan filter pushdown for REPLACE must only reuse values that are | |
| // known to be deterministic runtime-constants. Binding an arbitrary VALUES | |
| // expression here can capture volatile functions such as rand(), uuid(), or | |
| // now(), which may cause conflict detection to diverge from the inserted row. | |
| // Conservatively disable the optimization when the value is not already | |
| // recognized as a direct constant expression. | |
| return nil, nil |
…#24179) ### Background PR #24153 introduced an `OldColCapture` mechanism in DEDUP JOIN to merge REPLACE INTO's two main-table scans into one. Each worker recorded probe-side old-column values into per-worker `capturedVecs` keyed by build bucket, intended to be emitted alongside the build row at finalize. That introduced a latent bug: parallel probe workers kept `capturedVecs` and `captured` as container-local state, but the `finalize()` multi-worker protocol only merged the `matched` bitmap through `ap.Channel`. Non-merger workers' captured values were silently dropped, so rows matched by those workers would emit NULL (or stale) in the placeholder slot. Commit `7cc544d2d` (`fix(replace): disable parallel probe for DEDUP JOIN with OldColCapture`) worked around this by forcing `Mcpu = 1` on every probe scope whenever `OldColCapture` was active (`pkg/sql/compile/compile.go`). Shuffle + capture additionally panics NYI. The REPLACE INTO merged-scan path has thus been running **without any probe-side parallelism** since. ### This PR Restore parallelism by upgrading the finalize merge protocol. 1. New `WorkerJoinMsg` struct in `pkg/sql/colexec/dedupjoin/types.go` carries `{matched, captured, capturedVecs}`. `DedupJoin.Channel` becomes `chan *WorkerJoinMsg`. 2. Non-merger workers relinquish ownership of their capture state to the merger when sending; the merger: - Or's in the matched bitmap (existing behavior). - Walks the worker's `captured` bitmap and, for each bucket not already present in its own `captured`, copies per-column values from the worker's `capturedVecs` into its own. First-wins semantics across workers — any one captured value for a bucket is semantically equivalent since HashOnUnique gives a 1:1 bucket↔build-row mapping. - Frees the worker's `capturedVecs` after merging (ownership was transferred). 3. Remove the `Mcpu = 1` forcing in `compile.go` so broadcast DedupJoin with OldColCapture runs parallel again. The shuffle + capture NYI panic stays — cross-CN pipeline channel semantics for `capturedVecs` are out of scope here and will be addressed separately. 4. The defensive capture-field copy in `dupOperator` (also from `7cc544d2d`) is retained as it's a prerequisite for parallel clones. ### Tests Added unit tests in `pkg/sql/colexec/dedupjoin/join_test.go`: - `TestMergeCaptured_DisjointBuckets` — parallel workers capture different buckets; after merge, merger owns the union. - `TestMergeCaptured_FirstWinsOnConflict` — when both workers captured the same bucket, merger keeps its own value. - `TestMergeCaptured_EmptyWorkerMsg` — worker with empty capture doesn't corrupt merger state. - `TestWorkerJoinMsg_ChannelRoundTrip` — full channel send/receive + merge + free cycle without leaks. - `TestReceiveWorkerMsg_ContextCancel` / `_ChannelClose` — receive helper respects context cancellation and closed channel. Existing end-to-end `TestDedupJoinCapture{,PartialMatch,Reset}` unchanged, all pass. `go test -race` clean. `make static-check` clean. ### Benchmark YCSB `workload-replace` (100K ops, updateproportion=1, zipfian, threads=16, single CN, one iteration): | metric | baseline (main) | this PR | delta | |--------------------------|----------------:|----------:|---------:| | RUN Throughput (ops/sec) | 1384.64 | 1561.67 | +12.8% | | UPDATE avg latency (us) | 11255.67 | 9978.82 | -11.3% | | UPDATE p95 (us) | 22527 | 18223 | -19.1% | | UPDATE p99 (us) | 53471 | 40895 | -23.5% | | UPDATE max (us) | 593407 | 260095 | -56.2% | Tail latency improvements (p99 -23%, max -56%) match the expected effect of restoring parallel probe: reduced queueing at the DEDUP JOIN stage under the high-conflict zipfian workload. ### Follow-ups (separate issues / PRs) - Enable shuffle + capture (needs cross-CN `Channel`/`WorkerJoinMsg` transport). - Re-evaluate PR #24163 (REPLACE static scan pushdown + IF-guard index rewrite) under the restored parallel baseline. Approved by: @aunjgr
What type of PR is this?
Which issue(s) this PR fixes:
issue #23946
What this PR does / why we need it:
This PR implements the REPLACE medium-term optimization (plan B) and includes a follow-up fix found during BVT validation.
Main changes:
skipUniqueIdxin REPLACE pathneedRewriteIdxin final projection.MULTI_UPDATEskips redundant index writes.REPLACE ... VALUESwithout explicit column list (and bounded rows), push OR-equality filters to PK dedup scan and single-part unique dedup scans.if/iffdid not supportrowid, while REPLACE usedif(needRewrite, old_rowid, null).rowidsupport toif/ifffunction typing/execution.exprCanRemoveProjectto avoid optimizer panic on malformed expressions.Added/updated tests:
pkg/sql/plan/build_test.goif(rowid)function test inpkg/sql/plan/function/operatorSet_test.goVerification:
go test ./pkg/sql/plan/function -run 'Test_Iff(Rowid|Check_MixedTypes)' -count=1go test ./pkg/sql/plan -run TestReplace -count=1go test ./pkg/sql/plan -count=1make static-check../mo-tester/run.sh -p /Users/LoveYY/Develop/matrixorigin/perf-replace-affected-rows/test/distributed/cases/dml/replace -g -n